package com.open;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;


public class FlinkCDCDataStream {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> mysqlDS = env.fromSource(FlinkSourceUtil.getMySqlSource("your_database","your_tablename"), WatermarkStrategy.noWatermarks(), "mysql-source");

        SingleOutputStreamOperator<Tuple2<String,Integer>> mapDS = mysqlDS.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String value) throws Exception {
                JSONObject mysqlJson = JSONObject.parseObject(value);
                System.out.println(value);
                JSONObject after = mysqlJson.getJSONObject("after");
                String name = after.getString("name");
                int age = after.getIntValue("age");
                return new Tuple2<String,Integer>(name,age);
            }
        });

        SinkFunction<Tuple2<String,Integer>> jdbcSink = JdbcSink.sink(

                "INSERT INTO your_db_name (name, age) VALUES (?,?)",
                new JdbcStatementBuilder<Tuple2<String,Integer>>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Tuple2<String,Integer> s) throws SQLException {
                        preparedStatement.setString(1,s.f0);
                        preparedStatement.setInt(2,s.f1);
                    }
                }
    ,
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(2) // 批次的大小：条数
                        .withBatchIntervalMs(300) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:postgresql://ip:8000/db?currentSchema=schema&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("username")
                        .withPassword("password")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );

        mapDS.addSink(jdbcSink);

        //6.启动
        env.execute();

    }
}
